package com.xinrong.filter;

import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import com.xinrong.generated.TimeLimitFilterProtos;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;

/**
 * Created by xinrong3 on 14-5-28.
 * HBase Filter to control time,does not garantee the whole result
 */
public class TimeLimitFilter extends FilterBase{
    private long limit = 0;//nano time
    private long startTime = 0;
    public static final Log LOG = LogFactory.getLog(TimeLimitFilter.class);

    /**
     * Initializes filter with an integer  limit.  if scanned time exceed @limit (millisecond)
     * then we just stop
     * @param limit ms time .
     */
    public TimeLimitFilter(final long limit)
    {
        Preconditions.checkArgument(limit >= 0, "time limit must be positive %s", limit);
        //turn ms 2 nano time
        this.limit = limit*1000*1000;
    }


    /**
     * @return limit time
     */
    public long getLimit() {
        return limit;
    }



    @Override
    public ReturnCode filterKeyValue(Cell v)
    {
        if(startTime==0){
            startTime=System.nanoTime();
            return ReturnCode.INCLUDE;
        }else{
            if(System.nanoTime()-startTime>limit){
                LOG.warn("interrupt scan at "+Bytes.toLong(v.getQualifier()));
                return ReturnCode.NEXT_ROW;
            }else{
                return ReturnCode.INCLUDE;
            }
        }
    }

    @Override
    public void reset()
    {
        this.startTime =0;
    }

    public static Filter createFilterFromArguments(ArrayList<byte []> filterArguments) {
        Preconditions.checkArgument(filterArguments.size() == 1,
                "Expected 2 but got: %s", filterArguments.size());
        int limit = ParseFilter.convertByteArrayToInt(filterArguments.get(0));
        return new TimeLimitFilter(limit);
    }

    /**
     * @return The filter serialized using pb
     */
    public byte [] toByteArray() {
        TimeLimitFilterProtos.TimeLimitFilter.Builder builder =
                TimeLimitFilterProtos.TimeLimitFilter.newBuilder();
        builder.setLimit(this.limit/1000/1000);
        return builder.build().toByteArray();
    }

    /**
     * @param pbBytes A pb serialized {@link com.xinrong.filter.TimeLimitFilter} instance
     * @return An instance of {@link com.xinrong.filter.TimeLimitFilter} made from <code>bytes</code>
     * @throws org.apache.hadoop.hbase.exceptions.DeserializationException
     * @see #toByteArray
     */
    public static TimeLimitFilter parseFrom(final byte [] pbBytes)
            throws DeserializationException {
        TimeLimitFilterProtos.TimeLimitFilter proto;
        try {
            proto = TimeLimitFilterProtos.TimeLimitFilter.parseFrom(pbBytes);
        } catch (InvalidProtocolBufferException e) {
            throw new DeserializationException(e);
        }
        return new TimeLimitFilter(proto.getLimit());
    }

    /**
     * @param o
     * @return true if and only if the fields of the filter that are serialized
     * are equal to the corresponding fields in other.  Used for testing.
     */
    public boolean areSerializedFieldsEqual(Filter o) {
        if (o == this) return true;
        if (!(o instanceof TimeLimitFilter)) return false;

        TimeLimitFilter other = (TimeLimitFilter)o;
        return this.getLimit() == other.getLimit();
    }

    @Override
    public String toString() {
        return String.format("%s (%d)", this.getClass().getSimpleName(),
                this.limit/1000/1000);
    }
}
